package springboot.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

@Configuration
public class KafkaInitialConfiguration {

    /**
     * 监听器工厂
     */
    @Autowired
    private ConsumerFactory<String,String> consumerFactory;

    /**
     * @return 配置一个消息过滤策略
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> myFilterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        //设置消息过滤策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                //这里做逻辑判断
                //返回true的消息将会被丢弃
                return true;
            }
        });
        return factory;
    }
}